/**
 * Copyright (c) 2015, 玛雅牛［李飞］ (myaniu@gmail.com).
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.jfinal.plugin.zbus;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;

import org.reflections.Reflections;
import org.reflections.scanners.SubTypesScanner;
import org.reflections.scanners.TypeAnnotationsScanner;
import org.reflections.util.ClasspathHelper;
import org.reflections.util.ConfigurationBuilder;
import org.zbus.broker.Broker;
import org.zbus.broker.BrokerConfig;
import org.zbus.broker.ZbusBroker;
import org.zbus.mq.Consumer;
import org.zbus.mq.Producer;
import org.zbus.mq.Protocol.MqMode;

import com.jfinal.kit.StrKit;
import com.jfinal.log.Log;
import com.jfinal.plugin.IPlugin;
import com.jfinal.plugin.zbus.annotation.Event;
import com.jfinal.plugin.zbus.annotation.Handler;
import com.jfinal.plugin.zbus.annotation.Mq;
import com.jfinal.plugin.zbus.annotation.Mqs;
import com.jfinal.plugin.zbus.annotation.Topic;
import com.jfinal.plugin.zbus.annotation.Topics;
import com.jfinal.plugin.zbus.handler.TMsgHandler;
import com.jfinal.plugin.zbus.sender.Sender;

/**
 * @ClassName: ZbusPlugin
 * @Description: JFinal的Zbus插件实现
 * @author 李飞
 * @date 2015年7月29日 下午12:46:32
 * @since V1.0.0
 */
public class ZbusPlugin implements IPlugin {

	/**
	 * 日志
	 */
	private static final Log LOG = Log.getLog("ZbusPlugin");

	/**
	 * MQ消费者配置Map
	 */
	private final Map<String, TMsgHandler<?>> mqNameTMsgHandlerMap = new HashMap<String, TMsgHandler<?>>();

	/**
	 * Topic消费者配置Map mp - topic - TMsgHandler
	 */
	private final Map<String, Map<String, TMsgHandler<?>>> mqNamePubSubTMsgHandlerMap = new HashMap<String, Map<String, TMsgHandler<?>>>();

	/**
	 * 消费者列表
	 */
	private final List<Consumer> consumerList = new ArrayList<Consumer>();

	/**
	 * 发送器列表
	 */
	private final static List<Sender<?>> senderList = new ArrayList<Sender<?>>();

	/**
	 * broker对象
	 */
	private static Broker broker = null;

	/**
	 * 简单Broker配置
	 */
	private BrokerConfig brokerConfig = null;

	/**
	 * 默认扫描包路径
	 */
	private final String scanRootPackage;

	/**
	 * 默认构造函数，使用jvm
	 */
	public ZbusPlugin() {
		this("jvm");
	}

	/**
	 * 默认构造函数,可指定brokerAddress
	 */
	public ZbusPlugin(String brokerAddress) {
		this(brokerAddress, null);
	}

	public ZbusPlugin(String brokerAddress, String scanRootPackage) {
		this(new BrokerConfig(brokerAddress), scanRootPackage);
	}

	/**
	 * 构造函数 使用BrokerConfig config构建
	 */
	public ZbusPlugin(BrokerConfig config) {
		this(config, null);
	}

	public ZbusPlugin(BrokerConfig config, String scanRootPackage) {
		this.brokerConfig = config;
		this.scanRootPackage = scanRootPackage;
		// 创建broker
		ensureBroker();
		// 加载注解
		autoLoadByAnnotation();
	}

	/**
	 * @Title: createProducer
	 * @Description: 创建一个Producer
	 * @return Producer
	 * @throws InterruptedException
	 * @throws IOException
	 * @since V1.0.0
	 */
	public static Producer createProducer(Sender<?> sender, String mq, MqMode mqMode)
			throws IOException, InterruptedException {
		Producer producer = new Producer(broker, mq, mqMode);
		producer.createMQ();
		senderList.add(sender);
		return producer;
	}

	/**
	 * @Title: registerMqMsgHandler
	 * @Description: 注册Mq的消息回调接口
	 * @param mq
	 *            MQ名
	 * @param msgHandler
	 *            消息到达回调接口
	 * @since V1.0.0
	 */
	public void registerMqMsgHandler(String mq, TMsgHandler<?> msgHandler) {
		if (mqNameTMsgHandlerMap.containsKey(mq)) {
			LOG.warn("(mq=" + mq + ")对应的消息处理器已存在!");
		}
		mqNameTMsgHandlerMap.put(mq, msgHandler);
	}

	/**
	 * @Title: registerTopicMsgHandler
	 * @Description: 注册Topic的消息回调接口
	 * @param mq
	 *            MQ名
	 * @param topic
	 *            主题名
	 * @param msgHandler
	 *            消息到达回调接口
	 * @since V1.0.0
	 */
	public void registerTopicMsgHandler(String mq, String topic, TMsgHandler<?> msgHandler) {
		// 依据mq获得 topic－TMsgHandler映射map
		Map<String, TMsgHandler<?>> tmc = this.mqNamePubSubTMsgHandlerMap.get(mq);
		if (null == tmc) {
			tmc = new HashMap<String, TMsgHandler<?>>();
		}
		if (tmc.containsKey(topic)) {
			LOG.warn("(mq=" + mq + ",topic=" + topic + ")对应的消息处理器已存在!");
		}
		tmc.put(topic, msgHandler);
		this.mqNamePubSubTMsgHandlerMap.put(mq, tmc);
	}

	/**
	 * @Title: ensureBroker
	 * @Description: 确保broker可用
	 * @throws Exception
	 * @since V1.0.0
	 */
	private void ensureBroker() {
		if (broker == null) {
			synchronized (this) {
				if (broker == null) {
					try {
						broker = new ZbusBroker(this.brokerConfig);
						LOG.info("创建broker成功(brokerAddress=" + this.brokerConfig.getBrokerAddress() + ")");
					} catch (IOException e) {
						throw new RuntimeException(e.getMessage(), e);
					}
				}
			}
		}
	}
	
	private void startMqConsumer() throws IOException{
		// 创建Mq消费者
		for (Entry<String, TMsgHandler<?>> entry : this.mqNameTMsgHandlerMap.entrySet()) {
			String mq = entry.getKey();
			Consumer c = new Consumer(broker, mq, MqMode.MQ);
			c.onMessage(entry.getValue());
			c.start();
			consumerList.add(c);
			LOG.info("创建MQ消费者成功(mq=" + mq + ")");
		}
	}
	
	private void startTopicConsumer() throws IOException{
		// 创建topic消费者
		for (Entry<String, Map<String, TMsgHandler<?>>> mqConfig : this.mqNamePubSubTMsgHandlerMap.entrySet()) {
			String mq = mqConfig.getKey();
			// topic <－> TMsgHandler 映射map
			Map<String, TMsgHandler<?>> tmt = mqConfig.getValue();
			for (Entry<String, TMsgHandler<?>> topicConfig : tmt.entrySet()) {
				String topic = topicConfig.getKey();
				Consumer c = new Consumer(broker, mq, MqMode.PubSub);
				c.setTopic(topic);
				c.onMessage(topicConfig.getValue());
				c.start();
				consumerList.add(c);
				LOG.info("创建Topic消费者成功 (mq=" + mq + ",topic=" + topic + ")");
			}
		}
	}

	@Override
	public boolean start() {
		try {
			// 确保创建
			ensureBroker();
			startMqConsumer();
			startTopicConsumer();
			return true;
		} catch (Exception e) {
			LOG.error(e.getMessage(), e);
			throw new RuntimeException(e.getMessage(), e);
		}
	}

	@Override
	public boolean stop() {
		try {
			// 关闭消费者
			for (Consumer c : consumerList) {
				c.close();
				c = null;
			}
			consumerList.clear();
			// 关闭所有发送器
			for (Sender<?> sender : senderList) {
				sender.close();
			}
			senderList.clear();

			// 关闭broker
			if (broker != null) {
				broker.close();
				broker = null;
			}
			return true;
		} catch (IOException e) {
			LOG.error(e.getMessage(), e);
			return false;
		}
	}
	
	@SuppressWarnings("rawtypes")
	private void autoLoadEvent(Class<? extends TMsgHandler> mc, TMsgHandler<?> hander){
		Event eh = mc.getAnnotation(Event.class);
		//事件
		if (eh != null) {
			String topicName = hander.getTypeClassName();
			this.registerTopicMsgHandler("_mq_event_", topicName, hander);
			LOG.info("通过注解自动加载Event(Topic)消息处理器( mq=_mq_event_,topic=" + topicName + ",handler=" + mc.getName() + " )");
		}
	}
	
	@SuppressWarnings("rawtypes")
	private void autoLoadMq(Class<? extends TMsgHandler> mc, TMsgHandler<?> hander){
		Mq mq = mc.getAnnotation(Mq.class);
		Mqs mqs = mc.getAnnotation(Mqs.class);
		//单个mq
		if(mq != null){
			this.registerMqMsgHandler(mq.value(), hander);
			LOG.info("通过注解自动加载MQ消息处理器( mq=" + mq.value() + ",handler=" + mc.getName() + " )");
		}
		//多个mq
		if (mqs != null) {
			for(String mqName : mqs.value()){
				this.registerMqMsgHandler(mqName, hander);
				LOG.info("通过注解自动加载MQ消息处理器( mq=" + mqName + ",handler=" + mc.getName() + " )");
			}
		}
	}
	
	@SuppressWarnings("rawtypes")
	private void autoLoadTopic(Class<? extends TMsgHandler> mc, TMsgHandler<?> hander){
		Topic topic = mc.getAnnotation(Topic.class);
		Topics topics = mc.getAnnotation(Topics.class);
		//单个主题
		if(topic != null){
			this.registerTopicMsgHandler(topic.mq(), topic.topic(), hander);
			LOG.info("通过注解自动加载Topic消息处理器( mq=" + topic.mq() + ",topic=" + topic.topic() + ",handler=" + mc.getName()+ " )");
		}
		//多主题
		if (topics != null) {
			for(Topic t : topics.value()){
				this.registerTopicMsgHandler(t.mq(), t.topic(), hander);
				LOG.info("通过注解自动加载Topic消息处理器( mq=" + t.mq() + ",topic=" + t.topic() + ",handler=" + mc.getName()+ " )");
			}
		}
	}

	@SuppressWarnings("rawtypes")
	private void autoLoadByAnnotation() {
		if (StrKit.isBlank(this.scanRootPackage)) {
			return;
		}
		Reflections reflections = new Reflections(new ConfigurationBuilder()
				.addUrls(ClasspathHelper.forClass(TMsgHandler.class)).forPackages(scanRootPackage.split(","))
				.setScanners(new SubTypesScanner(), new TypeAnnotationsScanner()).useParallelExecutor());
		
		Set<Class<? extends TMsgHandler>> handlerClasses = reflections.getSubTypesOf(TMsgHandler.class);
		for (Class<? extends TMsgHandler> mc : handlerClasses) {
			TMsgHandler<?> hander = null;
			Handler h = mc.getAnnotation(Handler.class);
			if (null == h) {
				continue;
			}
			//初始化执行器
			try {
				hander = (TMsgHandler<?>) mc.newInstance();
			} catch (Exception e) {
				LOG.error(e.getMessage(), e);
				throw new RuntimeException(e.getMessage(), e);
			}
			this.autoLoadEvent(mc, hander);
			this.autoLoadMq(mc, hander);
			this.autoLoadTopic(mc, hander);
		} // end for
	}
}
